package main

import (
	"context"
	"encoding/binary"
	"fmt"
	"net"
	"sync"
	"sync/atomic"
	"time"

	"git.dev.tencent.com/lwch/torrent"

	"git.dev.tencent.com/lwch/runtime"
)

type client struct {
	sync.RWMutex
	c          net.Conn
	have       map[int]bool
	ctx        context.Context
	cancel     context.CancelFunc
	choke      bool
	interested bool
	working    int64
}

func newClient(c net.Conn, nPieces, pieceLength int) *client {
	have := make(map[int]bool)
	for i := 0; i < nPieces; i++ {
		have[i] = false
	}
	ctx, cancel := context.WithCancel(context.Background())
	cli := &client{
		c:          c,
		have:       have,
		ctx:        ctx,
		cancel:     cancel,
		choke:      true,
		interested: false,
	}
	return cli
}

func (c *client) close() {
	c.c.Close()
	c.cancel()
}

func (c *client) download(idx, offset int) {
	for {
		if atomic.LoadInt64(&c.working) > 500 {
			time.Sleep(time.Second)
			continue
		}
		break
	}
	data := make([]byte, 17)             // length(header)+type+index+offset+length
	binary.BigEndian.PutUint32(data, 13) // type+index+offset+length
	data[4] = msgRequest
	binary.BigEndian.PutUint32(data[5:], uint32(idx))
	binary.BigEndian.PutUint32(data[9:], uint32(offset))
	binary.BigEndian.PutUint32(data[13:], blockSize)
	_, err := c.c.Write(data)
	runtime.Assert(err)
	runtime.Log("[%s]starting download, idx=%d, offset=%d", c.c.RemoteAddr(), idx, offset)
	atomic.AddInt64(&c.working, 1)
}

func (c *client) parseBitfield(data []byte) int {
	cnt := 0
	set := func(i, offset int, b bool) {
		idx := i*8 + offset
		if idx >= len(c.have) {
			return
		}
		c.have[idx] = b
		if b {
			cnt++
		}
	}
	for i, b := range data {
		set(i, 7, b&1 > 0)
		set(i, 6, b&2 > 0)
		set(i, 5, b&4 > 0)
		set(i, 4, b&8 > 0)
		set(i, 3, b&16 > 0)
		set(i, 2, b&32 > 0)
		set(i, 1, b&64 > 0)
		set(i, 0, b&128 > 0)
	}
	return cnt
}

func (c *client) read(data []byte) {
	left := len(data)
	i := 0
	for left > 0 {
		n, err := c.c.Read(data[i:])
		runtime.Assert(err)
		i += n
		left -= n
	}
}

func (c *client) handle() {
	defer func() {
		recover()
	}()
	defer c.cancel()
	conn := c.c
	msgLength := make([]byte, 4)
	for {
		select {
		case <-c.ctx.Done():
			return
		default:
		}
		runtime.Assert(conn.SetReadDeadline(time.Now().Add(time.Minute)))
		c.read(msgLength)
		length := binary.BigEndian.Uint32(msgLength)
		data := make([]byte, length)
		c.read(data)
		switch data[0] {
		case msgChoke:
			if length == 0 {
				// keepalive
				runtime.Log("[%s]received keepalive message", conn.RemoteAddr())
			} else {
				c.Lock()
				c.choke = true
				c.Unlock()
				runtime.Log("[%s]received choke message", conn.RemoteAddr())
			}
		case msgUnChoke:
			c.Lock()
			c.choke = false
			c.Unlock()
			runtime.Log("[%s]received unchoke message", conn.RemoteAddr())
		case msgInterested:
			c.Lock()
			c.interested = true
			c.Unlock()
			runtime.Log("[%s]received interested message", conn.RemoteAddr())
		case msgUnInterested:
			c.Lock()
			c.interested = false
			c.Unlock()
			runtime.Log("[%s]received uninterested message", conn.RemoteAddr())
		case msgHave:
			runtime.Log("[%s]received have message", conn.RemoteAddr())
		case msgBitfield:
			n := c.parseBitfield(data)
			runtime.Log("[%s]parse bitfield ok, found %d pieces", conn.RemoteAddr(), n)
		case msgRequest:
			runtime.Log("[%s]received request message", conn.RemoteAddr())
		case msgPiece:
			idx := binary.BigEndian.Uint32(data[1:])
			offset := binary.BigEndian.Uint32(data[5:])
			runtime.Log("[%s]received piece message, idx=%d, offset=%d, length=%d", conn.RemoteAddr(), idx, offset, len(data)-9)
			save(idx, offset, data[9:])
			c.ignore()
			blocking.done(int(idx), int(offset))
		case msgCancel:
			runtime.Log("[%s]received cancel message", conn.RemoteAddr())
		case msgPort:
			runtime.Log("[%s]received port message", conn.RemoteAddr())
		default:
			runtime.Log("[%s]received unknown message: %d, length=%d", conn.RemoteAddr(), data[0], length)
		}
	}
}

func (c *client) ignore() {
	atomic.AddInt64(&c.working, -1)
}

type clientsType struct {
	sync.RWMutex
	data map[string]*client
}

func makeHandshake(hash []byte) []byte {
	ret := make([]byte, 68)
	ret[0] = 19
	copy(ret[1:], "BitTorrent protocol")
	copy(ret[28:], hash)
	copy(ret[48:], PID)
	return ret
}

// return client id from remote
func readHandshake(c net.Conn) (string, error) {
	l := make([]byte, 1)
	_, err := c.Read(l)
	if err != nil {
		return "", err
	}
	data := make([]byte, l[0]+48) // length+reserved+info_hash+id
	_, err = c.Read(data)
	if err != nil {
		return "", err
	}
	return string(data[l[0]+28:]), nil
}

func (cs *clientsType) get(ip net.IP, port uint16, t torrent.Torrent, hash []byte) (*client, bool) {
	remote := fmt.Sprintf("%s:%d", ip, port)
	cs.RLock()
	c := cs.data[remote]
	cs.RUnlock()
	if c != nil {
		return c, false
	}
	d := net.Dialer{Timeout: 10 * time.Second}
	conn, err := d.Dial("tcp", fmt.Sprintf("%s:%d", ip, port))
	if err != nil {
		return nil, false
	}
	_, err = conn.Write(makeHandshake(hash))
	runtime.Assert(err)
	id, err := readHandshake(conn)
	if err != nil {
		return nil, false
	}
	runtime.Log("Handshake ok, remote_addr=%s, remote_id=%s", conn.RemoteAddr().String(), id)
	c = newClient(conn, len(t.Info.Pieces), t.Info.PieceLength)
	cs.Lock()
	if cs.data[remote] != nil {
		cs.data[remote].close()
	}
	cs.data[remote] = c
	cs.Unlock()
	return c, true
}

func (cs *clientsType) choose(idx int) []*client {
	var unChoked []*client
	cs.RLock()
	for _, c := range clients.data {
		c.RLock()
		choke := c.choke
		c.RUnlock()
		if !choke {
			unChoked = append(unChoked, c)
		}
	}
	cs.RUnlock()
	var ret []*client
	for _, c := range unChoked {
		if c.have[idx] {
			ret = append(ret, c)
		}
	}
	return ret
}

var clients clientsType

func init() {
	clients.data = make(map[string]*client)
}
